package com.cluster.hadoop.api;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import javax.xml.bind.SchemaOutputResolver;
import java.io.IOException;

public class WordCountDriver {

    public static class WordCountDriverMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

        @Override
        protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
//            String s = value.toString();
//            String[] lines = s.split(",");
//            for (String word : lines) {
//                context.write(new Text(word), new IntWritable(1));
//            }
            String s = value.toString();
            try {
                String[] lines = s.split("\t");
                for (int i = 0; i < lines.length; i++) {
                    System.out.println(i + " " + lines[i]);
                }

                String word = lines[11].split(" ")[0];

                context.write(new Text(word), new IntWritable(1));
            } catch (Exception e) {
                System.out.println(s);
            }
        }
    }

    public static class WordCountDriverReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration cfg = new Configuration();

        Job job = Job.getInstance();
        job.setJarByClass(WordCountDriver.class);
        job.setMapperClass(WordCountDriverMapper.class);
        job.setReducerClass(WordCountDriverReduce.class);

        //告诉机器mapper端的输出的Key,value
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //告诉集群reducer端的输出的key,value
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //告诉集群输入源---hbase redis,mysql,s3....
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.setInputPaths(job, new Path("file:\\C:\\Users\\XBZX\\Downloads\\part-m-00000"));
        //输出地址----hbase,mysql....
        job.setOutputFormatClass(TextOutputFormat.class);
        FileSystem fs = FileSystem.get(cfg);
        Path path = new Path("file:\\D:\\worlCount");
        if(fs.exists(path)){
            fs.delete(path,true);
        }
        TextOutputFormat.setOutputPath(job, path);
        job.setNumReduceTasks(2);

        //任务提交
        System.exit(job.waitForCompletion(true)?0:1);
    }
}
